home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Freelog 125
/
Freelog_MarsAvril2015_No125.iso
/
Musique
/
Quod Libet
/
quodlibet-3.3.0-installer.exe
/
bin
/
quodlibet
/
ext
/
events
/
mpdserver
/
main.pyc
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2014-12-31
|
31KB
|
876 lines
# Source Generated with Decompyle++
# File: in.pyc (Python 2.7)
import re
import shlex
from gi.repository import GObject
from quodlibet import const
from tcpserver import BaseTCPServer, BaseTCPConnection
class AckError(object):
NOT_LIST = 1
ARG = 2
PASSWORD = 3
PERMISSION = 4
UNKNOWN = 5
NO_EXIST = 50
PLAYLIST_MAX = 51
SYSTEM = 52
PLAYLIST_LOAD = 53
UPDATE_ALREADY = 54
PLAYER_SYNC = 55
EXIST = 56
TAG_MAPPING = [
(u'Artist', 'artist'),
(u'ArtistSort', 'artistsort'),
(u'Album', 'album'),
(u'AlbumArtist', 'albumartist'),
(u'AlbumArtistSort', 'albumartistsort'),
(u'Title', 'title'),
(u'Track', 'tracknumber'),
(u'Name', ''),
(u'Genre', 'genre'),
(u'Date', '~year'),
(u'Composer', 'composer'),
(u'Performer', 'performer'),
(u'Comment', 'commend'),
(u'Disc', 'discnumber'),
(u'Time', '~#length'),
(u'Name', '~basename'),
(u'MUSICBRAINZ_ARTISTID', 'musicbrainz_artistid'),
(u'MUSICBRAINZ_ALBUMID', 'musicbrainz_albumid'),
(u'MUSICBRAINZ_ALBUMARTISTID', 'musicbrainz_albumartistid'),
(u'MUSICBRAINZ_TRACKID', 'musicbrainz_trackid')]
def format_tags(song):
'''Gives a tag list message for a song'''
lines = []
for mpd_key, ql_key in TAG_MAPPING:
if not ql_key:
continue
value = song.comma(ql_key) if ql_key.startswith('~#') else None
if value is not None:
lines.append(u'%s: %s' % (mpd_key, value))
continue
return u'\n'.join(lines)
class ParseError(Exception):
pass
def parse_command(line):
'''Parses a MPD command (without trailing newline)
Returns (command, [arguments]) or raises ParseError in case of an error.
'''
if not isinstance(line, bytes):
raise AssertionError
parts = None.split('[ \\t]+', line, maxsplit = 1)
if not parts:
raise ParseError('empty command')
command = parts[0]
if len(parts) > 1:
lex = shlex.shlex(parts[1], posix = True)
lex.whitespace_split = True
lex.commenters = ''
lex.quotes = '"'
lex.whitespace = ' \t'
args = list(lex)
else:
args = []
try:
command = command.decode('utf-8')
except ValueError:
e = None
raise ParseError(e)
dec_args = []
for arg in args:
try:
arg = arg.decode('utf-8')
except ValueError:
e = None
raise ParseError(e)
dec_args.append(arg)
return (command, dec_args)
class PlayerOptions(GObject.Object):
'''Provides a simplified interface for playback options.
This should probably go into the core.
'''
__gsignals__ = {
'random-changed': (GObject.SignalFlags.RUN_LAST, None, tuple()),
'repeat-changed': (GObject.SignalFlags.RUN_LAST, None, tuple()),
'single-changed': (GObject.SignalFlags.RUN_LAST, None, tuple()) }
def __init__(self, app):
super(PlayerOptions, self).__init__()
self._repeat = app.window.repeat
self._rid = self._repeat.connect(('toggled',), (lambda : self.emit('repeat-changed')))
def order_changed(*args):
self.emit('random-changed')
self.emit('single-changed')
self._order = app.window.order
self._oid = self._order.connect('changed', order_changed)
def destroy(self):
self._repeat.disconnect(self._rid)
del self._repeat
self._order.disconnect(self._oid)
del self._order
def set_single(self, value):
is_single = self.get_single()
if value and not is_single:
self._order.set_active_by_name('onesong')
elif not value and is_single:
self._order.set_active_by_name('inorder')
def get_single(self):
return self._order.get_active_name() == 'onesong'
def get_random(self):
return self._order.get_shuffle()
def set_random(self, value):
self._order.set_shuffle(value)
def get_repeat(self):
return self._repeat.get_active()
def set_repeat(self, value):
self._repeat.set_active(value)
class MPDService(object):
'''This is the actual shared MPD service which the clients talk to'''
version = (0, 17, 0)
def __init__(self, app):
self._app = app
self._connections = set()
self._idle_subscriptions = { }
self._pl_ver = 0
self._options = PlayerOptions(app)
def options_changed(*args):
self.emit_changed('options')
self._options.connect('random-changed', options_changed)
self._options.connect('repeat-changed', options_changed)
self._options.connect('single-changed', options_changed)
self._player_sigs = []
def volume_changed(*args):
self.emit_changed('mixer')
id_ = app.player.connect('notify::volume', volume_changed)
self._player_sigs.append(id_)
def player_changed(*args):
self.emit_changed('player')
id_ = app.player.connect('paused', player_changed)
self._player_sigs.append(id_)
id_ = app.player.connect('unpaused', player_changed)
self._player_sigs.append(id_)
id_ = app.player.connect('seek', player_changed)
self._player_sigs.append(id_)
def playlist_changed(*args):
self._pl_ver += 1
self.emit_changed('playlist')
id_ = app.player.connect('song-started', playlist_changed)
self._player_sigs.append(id_)
def _get_id(self, info):
return (id(info) & 0xFFFFFFFFL) >> 1
def destroy(self):
for id_ in self._player_sigs:
self._app.player.disconnect(id_)
self._options.destroy()
del self._app
del self._options
def add_connection(self, connection):
self._connections.add(connection)
def remove_connection(self, connection):
self._idle_subscriptions.pop(connection, None)
self._connections.remove(connection)
def register_idle(self, connection, subsystems):
self._idle_subscriptions[connection] = subsystems
def unregister_idle(self, connection):
try:
del self._idle_subscriptions[connection]
except KeyError:
pass
def emit_changed(self, subsystem):
for conn, subs in self._idle_subscriptions.iteritems():
if not not subs:
if subsystem in subs:
line = u'changed: %s' % subsystem
conn.log(u'<- ' + line)
conn.write_line(line)
conn.ok()
conn.start_write()
continue
return None
def play(self):
if not self._app.player.song:
self._app.player.reset()
else:
self._app.player.paused = False
def playid(self, songid):
self.play()
def pause(self, value = None):
if value is None:
self._app.player.paused = not (self._app.player.paused)
else:
self._app.player.paused = value
def stop(self):
self._app.player.stop()
def next(self):
self._app.player.next()
def previous(self):
self._app.player.previous()
def seek(self, songpos, time_):
'''time_ in seconds'''
self._app.player.seek(time_ * 1000)
def seekid(self, songid, time_):
'''time_ in seconds'''
self._app.player.seek(time_ * 1000)
def seekcur(self, value, relative):
if relative:
pos = self._app.player.get_position()
self._app.player.seek(pos + value)
else:
self._app.player.seek(value)
def setvol(self, value):
'''value: 0..100'''
self._app.player.volume = value / 100
def repeat(self, value):
self._options.set_repeat(value)
def random(self, value):
self._options.set_random(value)
def single(self, value):
self._options.set_single(value)
def stats(self):
has_song = int(bool(self._app.player.info))
stats = [
('artists', has_song),
('albums', has_song),
('songs', has_song),
('uptime', 1),
('playtime', 1),
('db_playtime', 1),
('db_update', 1252868674)]
return stats
def status(self):
app = self._app
info = app.player.info
if info:
if app.player.paused:
state = 'pause'
else:
state = 'play'
else:
state = 'stop'
status = [
('volume', int(app.player.volume * 100)),
('repeat', int(self._options.get_repeat())),
('random', int(self._options.get_random())),
('single', int(self._options.get_single())),
('consume', 0),
('playlist', self._pl_ver),
('playlistlength', int(bool(app.player.info))),
('mixrampdb', 0),
('state', state)]
if info:
total_time = int(info('~#length'))
elapsed_time = int(app.player.get_position() / 1000)
elapsed_exact = '%1.3f' % app.player.get_position() / 1000
status.extend([
('song', 0),
('songid', self._get_id(info))])
if state != 'stop':
status.extend([
('time', '%d:%d' % (elapsed_time, total_time)),
('elapsed', elapsed_exact),
('bitrate', info('~#bitrate'))])
return status
def currentsong(self):
info = self._app.player.info
if info is None:
return None
parts = None
parts.append(u'file: %s' % info('~filename'))
parts.append(format_tags(info))
parts.append(u'Pos: 0')
parts.append(u'Id: %d' % self._get_id(info))
return u'\n'.join(parts)
def playlistinfo(self, start = None, end = None):
if start is not None and start > 1:
return None
return None.currentsong()
def playlistid(self, songid = None):
return self.currentsong()
def plchanges(self, version):
if version != self._pl_ver:
return self.currentsong()
def plchangesposid(self, version):
info = self._app.player.info
if version != self._pl_ver and info:
parts = []
parts.append(u'file: %s' % info('~filename'))
parts.append(u'Pos: 0')
parts.append(u'Id: %d' % self._get_id(info))
return u'\n'.join(parts)
class MPDServer(BaseTCPServer):
def __init__(self, app, port):
self._app = app
super(MPDServer, self).__init__(port, MPDConnection, const.DEBUG)
def handle_init(self):
print_d('Creating the MPD service')
self.service = MPDService(self._app)
def handle_idle(self):
print_d('Destroying the MPD service')
self.service.destroy()
del self.service
def log(self, msg):
print_d(msg)
class MPDRequestError(Exception):
def __init__(self, msg, code = AckError.UNKNOWN, index = None):
self.msg = msg
self.code = code
self.index = index
class MPDConnection(BaseTCPConnection):
def handle_init(self, server):
service = server.service
self.service = service
service.add_connection(self)
str_version = '.'.join(map(str, service.version))
self._buf = bytearray('OK MPD %s\n' % str_version)
self._read_buf = bytearray()
self._use_command_list = False
self._command_list_ok = False
self._command_list = []
self._command = None
self.start_write()
self.start_read()
def handle_read(self, data):
self._feed_data(data)
while None:
line = self._get_next_line()
if line is None:
break
try:
(cmd, args) = parse_command(line)
except ParseError:
continue
try:
self._handle_command(cmd, args)
continue
except MPDRequestError:
e = None
self._error(e.msg, e.code, e.index)
self._use_command_list = False
del self._command_list[:]
continue
return None
def handle_write(self):
data = self._buf[:]
del self._buf[:]
return data
def can_write(self):
return bool(self._buf)
def handle_close(self):
self.log('connection closed')
self.service.remove_connection(self)
del self.service
def log(self, msg):
if const.DEBUG:
print_d('[%s] %s' % (self.name, msg))
def _feed_data(self, new_data):
'''Feed new data into the read buffer'''
self._read_buf.extend(new_data)
def _get_next_line(self):
'''Returns the next line from the read buffer or None'''
try:
index = self._read_buf.index('\n')
except ValueError:
return None
line = bytes(self._read_buf[:index])
del self._read_buf[:index + 1]
return line
def write_line(self, line):
'''Writes a line to the client'''
if not isinstance(line, unicode):
raise AssertionError
None._buf.extend(line.encode('utf-8', errors = 'replace') + '\n')
def ok(self):
self.write_line(u'OK')
def _error(self, msg, code, index):
error = []
error.append(u'ACK [%d' % code)
if index is not None:
error.append(u'@%d' % index)
if not self._command is not None:
raise AssertionError
None.append('u] {%s}' % self._command)
if msg is not None:
error.append(u' %s' % msg)
self.write_line(u''.join(error))
def _handle_command(self, command, args):
self._command = command
if command == u'command_list_end':
if not self._use_command_list:
self._error(u'list_end without begin')
return None
for cmd, args in enumerate(self._command_list):
try:
self._exec_command(cmd, args)
continue
except MPDRequestError:
e = None
raise MPDRequestError(e.msg, e.code, i)
continue
else:
self._use_command_list = False
del self._command_list[:]
return None
if None in (u'command_list_begin', u'command_list_ok_begin'):
if self._use_command_list:
raise MPDRequestError(u'begin without end')
self._use_command_list = True
self._command_list_ok = command == u'command_list_ok_begin'
if not not (self._command_list):
raise AssertionError
return None
if None._use_command_list:
self._command_list.append((command, args))
else:
self._exec_command(command, args)
def _exec_command(self, command, args, no_ack = False):
self._command = command
if command not in self._commands:
print_w('Unhandled command %r, sending OK.' % command)
command = 'ping'
if not self._use_command_list:
self.ok()
elif self._command_list_ok:
self.write_line(u'list_OK')
return None
(cmd, do_ack) = None._commands[command]
cmd(self, self.service, args)
if self._use_command_list or self._command_list_ok:
self.write_line(u'list_OK')
if do_ack:
self.ok()
_commands = { }
def Command(cls, name, ack = True):
def wrap(func):
if not name not in cls._commands:
raise AssertionError(name)
cls._commands[name] = (None, ack)
return func
return wrap
Command = classmethod(Command)
def list_commands(cls):
'''A list of supported commands'''
return cls._commands.keys()
list_commands = classmethod(list_commands)
def _verify_length(args, length):
if not len(args) >= length:
raise MPDRequestError('Wrong arg count')
def _parse_int(arg):
try:
return int(arg)
except ValueError:
raise MPDRequestError('invalid arg')
def _parse_bool(arg):
try:
value = int(arg)
if value not in (0, 1):
raise ValueError
except ValueError:
raise MPDRequestError('invalid arg')
return bool(value)
def _parse_range(arg):
try:
values = [ int(v) for v in arg.split(':') ]
except ValueError:
raise MPDRequestError('arg in range not a number')
if len(values) == 1:
return (values[0], values[0] + 1)
if None(values) == 2:
return values
raise None('invalid range')
def _cmd_idle(conn, service, args):
service.register_idle(conn, args)
_cmd_idle = MPDConnection.Command('idle', ack = False)(_cmd_idle)
def _cmd_ping(conn, service, args):
pass
_cmd_ping = MPDConnection.Command('ping')(_cmd_ping)
def _cmd_noidle(conn, service, args):
service.unregister_idle(conn)
_cmd_noidle = MPDConnection.Command('noidle')(_cmd_noidle)
def _cmd_close(conn, service, args):
conn.close()
_cmd_close = MPDConnection.Command('close', ack = False)(_cmd_close)
def _cmd_play(conn, service, args):
service.play()
_cmd_play = MPDConnection.Command('play')(_cmd_play)
def _cmd_playid(conn, service, args):
_verify_length(args, 1)
songid = _parse_int(args[0])
service.playid(songid)
_cmd_playid = MPDConnection.Command('playid')(_cmd_playid)
def _cmd_pause(conn, service, args):
value = None
if args:
_verify_length(args, 1)
value = _parse_bool(args[0])
service.pause(value)
_cmd_pause = MPDConnection.Command('pause')(_cmd_pause)
def _cmd_stop(conn, service, args):
service.stop()
_cmd_stop = MPDConnection.Command('stop')(_cmd_stop)
def _cmd_next(conn, service, args):
service.next()
_cmd_next = MPDConnection.Command('next')(_cmd_next)
def _cmd_previous(conn, service, args):
service.previous()
_cmd_previous = MPDConnection.Command('previous')(_cmd_previous)
def _cmd_repeat(conn, service, args):
_verify_length(args, 1)
value = _parse_bool(args[0])
service.repeat(value)
_cmd_repeat = MPDConnection.Command('repeat')(_cmd_repeat)
def _cmd_random(conn, service, args):
_verify_length(args, 1)
value = _parse_bool(args[0])
service.random(value)
_cmd_random = MPDConnection.Command('random')(_cmd_random)
def _cmd_single(conn, service, args):
_verify_length(args, 1)
value = _parse_bool(args[0])
service.single(value)
_cmd_single = MPDConnection.Command('single')(_cmd_single)
def _cmd_setvol(conn, service, args):
_verify_length(args, 1)
value = _parse_int(args[0])
service.setvol(value)
_cmd_setvol = MPDConnection.Command('setvol')(_cmd_setvol)
def _cmd_status(conn, service, args):
status = service.status()
for k, v in status:
conn.write_line(u'%s: %s' % (k, v))
_cmd_status = MPDConnection.Command('status')(_cmd_status)
def _cmd_stats(conn, service, args):
status = service.stats()
for k, v in status:
conn.write_line(u'%s: %s' % (k, v))
_cmd_stats = MPDConnection.Command('stats')(_cmd_stats)
def _cmd_currentsong(conn, service, args):
stats = service.currentsong()
if stats is not None:
conn.write_line(stats)
_cmd_currentsong = MPDConnection.Command('currentsong')(_cmd_currentsong)
def _cmd_count(conn, service, args):
conn.write_line(u'songs: 0')
conn.write_line(u'playtime: 0')
_cmd_count = MPDConnection.Command('count')(_cmd_count)
def _cmd_plchanges(conn, service, args):
_verify_length(args, 1)
version = _parse_int(args[0])
changes = service.plchanges(version)
if changes is not None:
conn.write_line(changes)
_cmd_plchanges = MPDConnection.Command('plchanges')(_cmd_plchanges)
def _cmd_plchangesposid(conn, service, args):
_verify_length(args, 1)
version = _parse_int(args[0])
changes = service.plchangesposid(version)
if changes is not None:
conn.write_line(changes)
_cmd_plchangesposid = MPDConnection.Command('plchangesposid')(_cmd_plchangesposid)
def _cmd_listallinfo(*args):
_cmd_currentsong(*args)
_cmd_listallinfo = MPDConnection.Command('listallinfo')(_cmd_listallinfo)
def _cmd_seek(conn, service, args):
_verify_length(args, 2)
songpos = _parse_int(args[0])
time_ = _parse_int(args[1])
service.seek(songpos, time_)
_cmd_seek = MPDConnection.Command('seek')(_cmd_seek)
def _cmd_seekid(conn, service, args):
_verify_length(args, 2)
songid = _parse_int(args[0])
time_ = _parse_int(args[1])
service.seekid(songid, time_)
_cmd_seekid = MPDConnection.Command('seekid')(_cmd_seekid)
def _cmd_seekcur(conn, service, args):
_verify_length(args, 1)
relative = False
time_ = args[0]
if time_.startswith(('+', '-')):
relative = True
try:
time_ = int(time_)
except ValueError:
raise MPDRequestError('arg not a number')
service.seekid(time_, relative)
_cmd_seekcur = MPDConnection.Command('seekcur')(_cmd_seekcur)
def _cmd_outputs(conn, service, args):
conn.write_line(u'outputid: 0')
conn.write_line(u'outputname: dummy')
conn.write_line(u'outputenabled: 1')
_cmd_outputs = MPDConnection.Command('outputs')(_cmd_outputs)
def _cmd_commands(conn, service, args):
for name in conn.list_commands():
conn.write_line(u'command: ' + unicode(name))
_cmd_commands = MPDConnection.Command('commands')(_cmd_commands)
def _cmd_tagtypes(conn, service, args):
for mpd_key, ql_key in TAG_MAPPING:
if ql_key:
conn.write_line(mpd_key)
continue
_cmd_tagtypes = MPDConnection.Command('tagtypes')(_cmd_tagtypes)
def _cmd_lsinfo(conn, service, args):
_verify_length(args, 1)
_cmd_lsinfo = MPDConnection.Command('lsinfo')(_cmd_lsinfo)
def _cmd_playlistinfo(conn, service, args):
if args:
_verify_length(args, 1)
(start, end) = _parse_range(args[0])
result = service.playlistinfo(start, end)
else:
result = service.playlistinfo()
if result is not None:
conn.write_line(result)
_cmd_playlistinfo = MPDConnection.Command('playlistinfo')(_cmd_playlistinfo)
def _cmd_playlistid(conn, service, args):
if args:
songid = _parse_int(args[0])
else:
songid = None
result = service.playlistid(songid)
if result is not None:
conn.write_line(result)
_cmd_playlistid = MPDConnection.Command('playlistid')(_cmd_playlistid)